Quartz 集群实战及原理解析

您所在的位置:网站首页 quartz 集群和非集群区别 Quartz 集群实战及原理解析

Quartz 集群实战及原理解析

#Quartz 集群实战及原理解析| 来源: 网络整理| 查看: 265

原标题:Quartz 集群实战及原理解析

选Quartz的团队基本上是冲着Quartz本身实现的集群去的, 不然JDK自带Timer就可以实现相同的功能, 而Timer存在的单点故障是生产环境上所不能容忍的。 在自己造个有负载均衡和支持集群(高可用、伸缩性)的调度框架又影响项目的进度, 所以大多数团队都直接使用了Quartz来作为调度框架。

一、 Quartz集群的架构图:

二、 Quartz集群配置:

-->

org.quartz.impl.jdbcjobstore.JobStoreCMT

EventScheduler

AUTO

org.quartz.simpl.SimpleThreadPool

50

5

true

60000

org.quartz.impl.jdbcjobstore.StdJDBCDelegate

SCHEDULER_

10

true

20000

true

false

myDS

myDS

false

com.mysql.jdbc.Driver

${db.url}

${db.username}

${db.password}

10

select 0 from dual

三、 集群源码分析

Quartz如何保证多个节点的应用只进行一次调度(即某一时刻的调度任务只由其中一台服务器执行)?

正如上面架构图所示, Quartz的集群是在同一个数据库下, 由数据库的数据来确定调度任务是否正在执行, 正在执行则其他服务器就不能去执行该行调度数据。 这个跟很多项目是用Zookeeper做集群不一样, 这些项目是靠Zookeeper选举出来的的服务器去执行, 可以理解为Quartz靠数据库选举一个服务器来执行。

如果之前看过这篇Quartz按时启动原理就应该了解到Quartz最主要的一个类QuartzSchedulerThread职责是触发任务, 是一个不断运行的Quartz主线程, 还是从这里入手了解集群原理。

集群配置里面有一个配置项:

org.quartz.impl.jdbcjobstore.JobStoreCMT

源码可以看到JobStoreCMT extends JobStoreSupport, 在QuartzSchedulerThread的run方法里面调用的acquireNextTriggers、 triggersFired、 releaseAcquiredTrigger方法都进行了加锁处理。

以acquireNextTriggers为例:

而LOCK_TRIGGER_ACCESS其实就是一个Java常量

protected static final String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";

这个常量传入加锁的核心方法executeInNonManagedTXLock: 处理逻辑前获取锁, 处理完成后在finally里面释放锁(一种典型的同步处理方法)

protected T executeInNonManagedTXLock(

String lockName,

TransactionCallback txCallback, final TransactionValidator txValidator) throws JobPersistenceException {

boolean transOwner = false;

Connection conn = null;

try {

if (lockName != null) {

// If we aren't using db locks, then delay getting DB connection

// until after acquiring the lock since it isn't needed.

if (getLockHandler().requiresConnection()) {

conn = getNonManagedTXConnection();

}

// 获取锁

transOwner = getLockHandler().obtainLock(conn, lockName);

}

if (conn == null) {

conn = getNonManagedTXConnection();

}

final T result = txCallback.execute(conn);

try {

commitConnection(conn);

} catch (JobPersistenceException e) {

rollbackConnection(conn);

if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback() {

@Override

public Boolean execute(Connection conn) throws JobPersistenceException {

return txValidator.validate(conn, result);

}

})) {

throw e;

}

}

Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();

if(sigTime != null && sigTime >= 0) {

signalSchedulingChangeImmediately(sigTime);

}

return result;

} catch (JobPersistenceException e) {

rollbackConnection(conn);

throw e;

} catch (RuntimeException e) {

rollbackConnection(conn);

throw new JobPersistenceException("Unexpected runtime exception: "

+ e.getMessage(), e);

} finally {

try {

// 释放锁

releaseLock(lockName, transOwner);

} finally {

cleanupConnection(conn);

}

}

}

getLockHandler那么可以思考下这个LockHandler怎么来的?

最后发现在JobStoreSupport的initail方法赋值了:

public void initialize(ClassLoadHelper loadHelper,

SchedulerSignaler signaler) throws SchedulerConfigException {

...

// If the user hasn't specified an explicit lock handler, then

// choose one based on CMT/Clustered/UseDBLocks.

if (getLockHandler() == null) {

// If the user hasn't specified an explicit lock handler,

// then we *must* use DB locks with clustering

if (isClustered()) {

setUseDBLocks(true);

}

if (getUseDBLocks()) {

...

// 在初始化方法里面赋值了

setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));

} else {

getLog().info(

"Using thread monitor-based data access locking (synchronization).");

setLockHandler(new SimpleSemaphore());

}

}

}

可以在StdRowLockSemaphore里面看到:

public static final String SELECT_FOR_LOCK = "SELECT * FROM "

+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST

+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";

public static final String INSERT_LOCK = "INSERT INTO "

+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("

+ SCHED_NAME_SUBST + ", ?)";

可以看出采用了悲观锁的方式对triggers表进行行加锁, 以保证任务同步的正确性。

当线程使用上述的SQL对表中的数据执行操作时,数据库对该行进行行加锁; 于此同时, 另一个线程对该行数据执行操作前需要获取锁, 而此时已被占用, 那么这个线程就只能等待, 直到该行锁被释放。

Quartz的锁存放在:

CREATE TABLE `scheduler_locks` (

`SCHED_NAME` varchar(120) NOT NULL COMMENT '调度名',

`LOCK_NAME` varchar(40) NOT NULL COMMENT '锁名',

PRIMARY KEY (`SCHED_NAME`,`LOCK_NAME`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

锁名和上述常量一一对应:

有可能你的任务不能支持并发执行(因为有可能任务还没执行完, 下一轮就trigger了, 如果没做同步处理可能造成严重的数据问题), 那么在任务类加上注解:

@DisallowConcurrentExecution

设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行

四、 参考资料

Quartz官网: http://quartz-scheduler.org/documentation/quartz-2.x/tutorials/tutorial-lesson-11

关注「JAVA思考者」

看更多 Java 技术精选文章

↓↓↓返回搜狐,查看更多

责任编辑:



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3